package com.atguigu.flink.chapter07;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Date;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/12/15 9:26
 */
public class Flink01_Window_Tumbling {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env
            .socketTextStream("hadoop162", 9999)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                @Override
                public void flatMap(String value,
                                    Collector<Tuple2<String, Long>> out) throws Exception {
                    for (String word : value.split(" ")) {
                        out.collect(Tuple2.of(word, 1L));
                    
                    }
                
                }
            })
            .keyBy(t -> t.f0)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                @Override
                // 当时间大于等于窗口结束时间的时候, 关闭窗口,触发计算: 执行这个方法
                public void process(String key,
                                    Context ctx,
                                    Iterable<Tuple2<String, Long>> elements, // 存储当前窗口所有元素
                                    Collector<String> out) throws Exception {
                    Date start = new Date(ctx.window().getStart());
                    Date end = new Date(ctx.window().getEnd());
    
                    ArrayList<String> words = new ArrayList<>();
                    elements.forEach(t -> words.add(t.f0));
    
                    out.collect("窗口:" + start + "  " + end + "  " + words);
                    
                }
            })
            .print();
        
    
        env.execute();
    }
}
